package com.xiaojiezhu.spark.rdd2

import org.apache.spark.rdd.{RDD}
import org.apache.spark.{SparkConf, SparkContext}

/**
  * 对 pari的行动操作
  */
object ScalaPairAction {



  def main(args : Array[String]): Unit ={
    val conf = new SparkConf().setMaster("local").setAppName("app")
    val sc = new SparkContext(conf)

    val rdd:RDD[(Int, Int)] = sc.parallelize(List((1,2),(3,4),(3,6)))

    //reduceByKey(rdd)
    //groupByKey(rdd)
    //mapValues(rdd)
    //flatMapValues(rdd)
    //keys(rdd)
    //values(rdd)
    sortByKey(rdd)
  }

  /**
    * 	根据key排序，默认是顺序，设置ascending=false则是倒序
    * @param rdd
    */
  def sortByKey(rdd : RDD[(Int,Int)]): Unit ={
    val result = rdd.sortByKey(ascending = false)
    result.foreach(x => println(x._1 + " , " + x._2))
  }

  /**
    * 返回所有元素的value，不会去重复，scala中没有括号
    * @param rdd
    */
  def values(rdd : RDD[(Int,Int)]): Unit ={
    val result = rdd.values
    result.foreach(x => println(x))
  }

  /**
    * 返回所有元素的key，不会去重复，scala中没有括号
    * @param rdd
    */
  def keys(rdd : RDD[(Int,Int)]): Unit ={
    val result = rdd.keys
    result.foreach(x => println(x))
  }

  /**
    * 应用这个rdd元素，返回一个迭代器，然后迭代器中每个元素都会有着原有对应的key作为其key
    * @param rdd
    */
  def flatMapValues(rdd : RDD[(Int,Int)]): Unit ={
    val result = rdd.flatMapValues(x => List(-1,x))
    result.foreach(x => println(x._1 + " , " + x._2))
  }

  /**
    * 对每一个值应用一个函数，返回value，但是其Key不变
    * @param rdd
    */
  def mapValues(rdd : RDD[(Int,Int)]) : Unit={
    val result = rdd.mapValues(x => "value is " + x)
    result.foreach(x => println(x._1 + " " + x._2))
  }

  /**
    * 对具有相同键的值进行分组
    * @param rdd
    */
  def groupByKey(rdd : RDD[(Int,Int)]) : Unit={
    val result = rdd.groupByKey()
    result.foreach((x) => println(x._1 + " " + x._2))
  }

  /**
    * 合并具有相同键的值
    * @param rdd
    */
  def reduceByKey(rdd: RDD[(Int, Int)]): Unit = {
    val result : RDD[(Int, Int)] = rdd.reduceByKey((x, y) => x + y)
    result.foreach(x => println(x._1 + " " + x._2))
  }



}
